---
title: Source Extensions
---

# Source Extensions (Plugins)

1. [Overview](#overview)
2. [Quickstart](#quickstart)
3. [Project Structure](#project-structure)
4. [Source Extension Interface](#source-extension-interface)
5. [Source Config](#source-config)
6. [Catalog of Streams](#catalog-of-streams)
7. [Pulling data](#pulling-data)
8. [Descriptor](#descriptor)
9. [Testing](#testing)
10. [Adding to Jitsu](#adding-to-jitsu)
11. [Advanced](#more-details)



## Overview

Jitsu Source Plugins allow anyone to implement a new source type for Jitsu
and publish it to make it available for all users of Jitsu.

Source Plugins designed to pull data from third party services.
Each source may have multiple types of data stream, e.g.: users and events.

## Quickstart

We need to use Jitsu SDK's CLI tool to bootstrap a project for new source plugin:
```shell
npx jitsu-cli extension create --type source
```

`nodejs` and `npx` must be installed on your system.

`jitsu-cli` creates a project for a source plugin. All parts are working, but they are placeholder implementation and don't do anything meaningful –
sample source project simply returns one data row with run_number and configuration parameters.

If you are an experienced developer, you can start replacing placeholder logic with your own right away.

## Project Structure

jitsu-cli generates project directory structure with a set of files typical for Typescript node.js project:
```text
├── package.json
├── tsconfig.json
├── src
│   └── index.ts
└── __test__
```

* `package.json` – file contains meta-information about npm project including name, version
* `src/index.ts` – file contains the main logic along with plugin descriptor, config objects, source stream catalog
* `__test__/` – directory for tests
* `tsconfig.json` – settings for Typescript compiler. No need to change that

## Source Extension Interface

Jitsu Source Extension must export following symbols:
* `descriptor` - instance of `ExtensionDescriptor`, to describe the extension - name, icon, config params
* `validator` - instance of`ConfigValidator`, to run validation again
* `sourceCatalog` - instance of `SourceCatalog`, to describe the data
* `streamReader` - instance of `StreamReader`, to

For more on these types please check files with full type descriptions, or check out `@jitsu/types` npm package

* [extension.d.ts](https://github.com/jitsucom/jitsu-sdk/blob/main/core/jitsu-types/extension.d.ts)
* [sources.d.ts](https://github.com/jitsucom/jitsu-sdk/blob/main/core/jitsu-types/sources.d.ts)

To make concrete implementation of these types we need to define our types for Source config and Stream config.

## Source Config

Source Config must contain properties required to connect the third party service, e.g.:
`userId`, `password`, or `apiKey` or for oauth2.0: `clientId`, `clientSecret`, `accessToken`,...
also it may contain configuration parameters of data streams that are common for all source's stream type, e.g.:
`start_date`,`refresh_window`

Starting from here we will use Airtable source implementation for examples:

```typescript
export interface AirtableConfig {
  //API Key. Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-
  apiKey: string;
  //Base ID. Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  baseId: string;
}
```

If you're using [Jitsu Configurator](/docs/configuration-ui), those parameters will be displayed properly in the UI

### Validating Config

Once we have the source config, we implement the `validator`.
The validator is an optional part, but highly recommend

In the validator we need to check whether provided config grants access to the API.

```typescript
async function validator(config: AirtableConfig): Promise<ConfigValidationResult> {
  console.log(`Will connect to airtable with apiKey=${config.apiKey} and baseId=${config.baseId}`);
  let res = await fetch("https://api.airtable.com/v0/meta/bases/" + config.baseId + "/tables", {
    method: "get",
    headers: [["Authorization", "Bearer " + config.apiKey]],
  });
  if (res.headers?.get("Content-Type").indexOf("application/json") >= 0) {
    let json = await res.json();
    if (json.error) {
      return { ok: false, message: `${json.error?.type || json.error} ${json.error?.message || ""}`.trim() };
    } else {
      return true
    }
  } else {
    return { ok: false, message: `Error Code: ${res.status} msg: ${await res.text()}` };
  }
}
```

Now we can test our validator with validate-config action that jitsu-cli already added to the project

```shell
yarn build && yarn validate-config -c '{"apiKey":"keyMy", "baseId":"app1234"}'
```
alternatively config may be stored in JSON-file:
```shell
yarn build && yarn validate-config -c config.json
```

If everything is fine, we should get the following output:
```text
[info ] - 🤔 Validating configuration {"apiKey":"keyMy", "baseId":"app1234"}
Will connect to airtable with apiKey=keyMy and baseId=app1234
[info ] - ✅ Config is valid. Hooray!
[info ] - ✨ Done
```

## Catalog of Streams

The source extension need a way to tell Jitsu what types of Streams it supports, and those Streams should be configured.

Think of streams as tables in database. In fact, each stream will be represented as a table in destination database.


First we need to declare type from `StreamConfig`:

```typescript
export interface TableStreamConfig {
  //Table Id. Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  tableId: string;
  //View Id (Optional). Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  viewId?: string;
  //Fields. Comma separated list of field names. If empty or undefined - all fields will be downloaded
  fields?: string;
}
```

And then declare `sourceCatalog` function that returns an array of `StreamPrototype` type:

```typescript
const sourceCatalog: SourceCatalog<AirtableConfig, TableStreamConfig> = async (config: AirtableConfig) => {
  return [
    {
      type: "table",
      supportedModes: ["full_sync"],
      params: [
        {
          id: "tableId",
          displayName: "Table Id",
          documentation:
                  "Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
          required: true,
        },
        {
          id: "viewId",
          displayName: "View Id",
          documentation:
                  "Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
          required: false,
        },
        {
          id: "fields",
          displayName: "Fields",
          documentation: "Comma separated list of field names. If empty or undefined - all fields will be downloaded",
          required: false,
        },
      ],
    },
  ];
};
```

In this example, the source exports on stream type (called `table`), which is configurable with `tableId`, `viewId` and `fields` parameter. User
can configure a multiple instances of one stream type. In case of Airtable, each stream will represent a view of a table. Optionally, with a
subset of fields (see `fields` parameter)

## Pulling data

The main logic happens in `streamReader` function. This function pulls data from third party API and send it to Jitsu Server
as **messages** through `StreamSink` interface (see detailed description of [all message types below](/docs/extending/source-plugins#message-types))

```typescript
const streamReader: StreamReader<AirtableConfig, TableStreamConfig> = async (
        sourceConfig: AirtableConfig,
        streamType: string,
        streamConfiguration: StreamConfiguration<TableStreamConfig>,
        streamSink: StreamSink,
        services: { state: StateService }
) => {
  const airtable = new Airtable({ apiKey: sourceConfig.apiKey });

  let table = airtable.base(sourceConfig.baseId).table(streamConfiguration.parameters.tableId);
  const selectedFields = streamConfiguration.parameters.fields
          ? streamConfiguration.parameters.fields.split(",").map(f => f.trim())
          : [];
  let selectParams: QueryParams<any> = {};
  if (selectedFields.length > 0) {
    streamSink.log("INFO", "Fields filter: " + JSON.stringify(selectedFields));
    selectParams.fields = selectedFields;
  }
  if (streamConfiguration.parameters.viewId) {
    selectParams.view = streamConfiguration.parameters.viewId;
  }
  let allRecords = await table.select(selectParams).all();
  allRecords.forEach(r => {
    const { id, createdTime, fields } = r._rawJson;
    //add record message
    streamSink.addRecord({
      $id: id,
      $recordTimestamp: new Date(createdTime),
      __sql_type_created: "timestamp with time zone",
      ...fields,
    });
  });
};
```


## Descriptor

`descriptor` object is used by Jitsu to build Source UI form with proper descriptions and documentations:

```typescript
const descriptor: ExtensionDescriptor = {
  id: "airtable",
  displayName: "Airtable Source",
  icon: "<svg xmlns=\"http://www.w3.org/2000/svg\" viewBox=\"0 0 64 64\" width=\"100%\" height=\"100%\"><path d=\"M28.578 5.906L4.717 15.78c-1.327.55-1.313 2.434.022 2.963l23.96 9.502a8.89 8.89 0 0 0 6.555 0l23.96-9.502c1.335-.53 1.35-2.414.022-2.963l-23.86-9.873a8.89 8.89 0 0 0-6.799 0\" fill=\"#fc0\"/><path d=\"M34.103 33.433V57.17a1.6 1.6 0 0 0 2.188 1.486l26.7-10.364A1.6 1.6 0 0 0 64 46.806V23.07a1.6 1.6 0 0 0-2.188-1.486l-26.7 10.364a1.6 1.6 0 0 0-1.009 1.486\" fill=\"#31c2f2\" /> <path d=\"M27.87 34.658l-8.728 4.215-16.727 8.015c-1.06.512-2.414-.26-2.414-1.44V23.17c0-.426.218-.794.512-1.07a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#ed3049\" /><path d=\"M27.87 34.658l-7.924 3.826L.512 22.098a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#c62842\" /></svg>",
  description: "This source pulls data from Airtable base",
  configurationParameters: [
    {
      id: "apiKey",
      displayName: "API Key",
      documentation:
              "Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-",
      required: true,
    },
    {
      id: "baseId",
      displayName: "Base Id",
      documentation:
              "Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
      required: true,
    },
  ],
};
```

## Testing

We need Jitsu Server to run the Source extension, but `jitsu-cli` created our project with the `execute` action that allows executing the source in command line mode with provided configs.
Pass Source Config json to with `-c` parameter and StreamConfig with `-s`
If source supports multiple types of streams it is required to pass stream type with `stream` parameter of StreamConfig object
```shell
yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}'
```
Result will look like that:
```text
[info ] - 🏃 Getting available streams...
[info ] - 🌊 Stream: table. Parameters: tableId - Table Id, viewId - View Id, fields - Fields
[info ] - [record] {"$id":"rec55g5x7GyRkz0Ex","$recordTimestamp":"2020-06-11T01:30:22.000Z","Client":["recNvBXPsvaiu5QaH"],"Due date":"2020-11-01","Project lead":{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"},"Category":"Technology design","Name":"Lemon headband","Project team":[{"id":"usr6hWARwVNgmt3WW","email":"katherineduh+collab35@demo.htable.com","name":"Sam Epps"},{"id":"usru7j5m2lcNhriKv","email":"katherineduh+collab7@demo.htable.com","name":"Cameron Toth"},{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"}],"Tasks":["recLLYKORkbdzQV1g","recUyxiAcW4x56HBH"],"Kickoff date":"2020-10-18"}
[info ] - 🏁 Result data:
[
  {
    "$id": "rec55g5x7GyRkz0Ex",
    "$recordTimestamp": "2020-06-11T01:30:22.000Z",
    "Client": "[\"recNvBXPsvaiu5QaH\"]",
    "Due date": "2020-11-01",
    "Project lead": {
      "id": "usrSdmrY5yGdbcZzg",
      "email": "katherineduh+collab23@demo.htable.com",
      "name": "Leslie Walker"
    },
    "Category": "Technology design",
    "Name": "Lemon headband",
    "Project team": "[{\"id\":\"usr6hWARwVNgmt3WW\",\"email\":\"katherineduh+collab35@demo.htable.com\",\"name\":\"Sam Epps\"},{\"id\":\"usru7j5m2lcNhriKv\",\"email\":\"katherineduh+collab7@demo.htable.com\",\"name\":\"Cameron Toth\"},{\"id\":\"usrSdmrY5yGdbcZzg\",\"email\":\"katherineduh+collab23@demo.htable.com\",\"name\":\"Leslie Walker\"}]",
    "Tasks": "[\"recLLYKORkbdzQV1g\",\"recUyxiAcW4x56HBH\"]",
    "Kickoff date": "2020-10-18"
  }
]
Special column types:

[info ] - ✨ Done
```
First jitsu-cli prints all messages received from source. Then it prints array of all received objects.

## Adding to Jitsu

To add our plugin to jitsu we need to build and publish it.

To build plugin code use:
```shell
yarn build
```

### Publishing to NPM Repository

Publishing plugin to public npm repository to make it available for other users.
You need to have an account in https://www.npmjs.com

The following commands in the project directory will publish the package to the npmjs repository:
```shell
npm login
npm publish
```
npm will ask to provide some additional details to complete the publishing.

### Setting up Jitsu Server

Users of a standalone jitsu server can setup a source based on plugin since version 1.42.
Add a new source of type **sdk_source**, information about plugin package and version, and config to `eventnative.yaml` file:
```yaml
sources:
...
  myairtable:
    type: sdk_source
    destinations:
      - target_destination
    collections:
      - name: table
        type: table
        table_name: myairtable_table
        schedule: '@daily'
        parameters:
          tableId: tblMy
    schedule: '@daily'
    config:
      apiKey: keyMy
      baseId: app1234
      package_name: jitsu-airtable-source
      package_version: ^0.7.2
```

### Setting up Jitsu Joint Image or Configurator UI

[The Configurator](/docs/configuration-ui) doesn't support an addition of custom sources yet
To make your source plugin appear in Jitsu Configurator UI please create a new ticket or pull request in the
[jitsu repository](https://github.com/jitsucom/jitsu)

## More Details

In previous sections we have reviewed process of source creation based on Airtable source as an example.
But this source implementation doesn't cover all features and modes that Source Extension can use.
Here we give more details on stream sink modes, data type conversions, persistent state management and more

### Message types

Source Extension communicates with Jitsu Server by means of `StreamSink` object and messages of `JitsuDataMessage` type.
Jitsu supports following message types:

| Type                | StreamSink helper method | Mode        | Payload         | Description                                                                                                                                                                                                                                                                                                                               |
|:--------------------|:-------------------------|-------------|-----------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **record**          | `addRecord`              | both        | `DataRecord`    | The main message that sends a single data object to the stream.                                                                                                                                                                                                                                                                           |
| **clear_stream**    | `clearStream`            | incremental | no              | In case incremental Source need to clear all previous date. Truncates table associated with stream in target destinations.<br/>When used `clear_stream` message must be the first message in transaction and can be used only once per stream.                                                                                            |
| **state**           | `changeState`            | both        | any object      | Stores provided object in persistent storage.                                                                                                                                                                                                                                                                                             |
| **new_transaction** | `newTransaction`         | incremental | no              | Starts a new transaction. Previous transaction will be committed to the destination. All consequent messages will be processed in the new transaction.<br/>First transaction is started implicitly even in the absence of `new_transaction` message.                                                                                      |
| **delete_records**  | `deleteRecords`          | incremental | `DeleteRecords` | Deletes records that belongs to certain time period from destination storages.<br/>Time period is controlled with DeleteRecords's `partitionTimestamp` and `granularity` parameters.<br/>Requires  `DataRecord` objects to have `$recordTimestamp` set with value.<br/>`delete_records` must precede any `record` messages in transaction |
| **log**             | `log`                    | both        | `LogRecord`     | Prints log message to the Jitsu server task log.                                                                                                                                                                                                                                                                                          |
| **schema**          | `schema`                 | both        | `StreamSchema`  | Defines stream schema. Not implemented yet


### Sync modes

Each stream type can be declared to support `full_sync` mode, `incremental` or both.
If both modes are supported, 'mode' parameter in `StreamConfiguration` object must be set to `full_sync` or `incremental`.
Supported modes are declared in `supportedModes` property of StreamInstance's returned by `sourceCatalog`

#### Full Sync
In Full Sync mode a Source Extension must download all data for the stream on each run.

Full Sync mode is recommended for the most of the Sources.
Exceptions are sources that have big amount of data that it takes long time to process or services that charge money from pulling the data.
For such case Incremental mode may be a good choice despite being little harder to implement.

#### Incremental
In Incremental mode a Source Extension download only fresh data on each run.
Incremental mode usually requires Source to save some information between runs, e.g. ID or _timestamp of last loaded data record.
To save data between run use `state` message. See [Persistent State](#persistent-state) section for more details.

#### Refresh window

Some third party services may update data during a day and other service may even update data for longer periods like 30-day.
It is still possible to use Incremental mode with such services. But we need to make sure that we delete all previous data
for current refresh window before loading it again.
For that we need to:
* Set `$recordTimestamp` for all record object that we send to Jitsu.
* Process stream data using date chunks(partitions). Choose granularity - `DAY` probably will suit for the most cases.
* Start `new_transaction` for each new date chunk processed.
* Add `delete_records` message at the beginning of each new transaction(including the first one)

You can use Jitsu Google-Analytics source as an example of Refresh Window implementation:
https://github.com/jitsucom/jitsu-sdk/blob/main/source-connectors/google-analytics

### Persistent State
It may be useful to make some information from previous source run available for consequent runs.
For that purpose Source Extension may send any custom object as a payload with `state` message.
Jitsu Service persist that object in meta storage and make it available to the `streamReader` via `StateService`
We recommend using `StateService` for modifying state too instead of sending `state` message directly.

To test that your source work correctly with state you can pass path of the file containing state object to the `execute` script using `-t` parameter:
```shell
yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}'  -t ./state.json
```

### Record $id
It is required to set `$id` property for each record.<br/>
Usually Source data has some ID associated with its entities.
When this is not the case you can use helper function `buildSignatureId`  from `@jitsu/jlib/lib/sources-lib`
to generate id as hash of object data.

### Flattening

Source Extension is allowed to use object with nested objects as `DataRecord`.
If target destination is SQL based warehouse - Jitsu will automatically convert record to flat structure.
Otherwise, object structure will be left intact.
If you want to force flat structure for all warehouses you can use `flatten` function from `@jitsu/jlib` module

### Data types mapping

Jitsu assumes types of data record parameters in destination based on original type
with exception to dates represented as string in simplified extended ISO format: `YYYY-MM-DDTHH:mm:ss.sssZ` – such string will be converted to the Date type in the destination storage

### Date timezone
Jitsu runs Sources with default timezone set to `UTC`